/*
 * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved.
 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 *
 */

package com.sun.corba.se.impl.transport;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.Hashtable;
import java.util.HashMap;
import java.util.Map;

import org.omg.CORBA.COMM_FAILURE;
import org.omg.CORBA.CompletionStatus;
import org.omg.CORBA.DATA_CONVERSION;
import org.omg.CORBA.INTERNAL;
import org.omg.CORBA.MARSHAL;
import org.omg.CORBA.OBJECT_NOT_EXIST;
import org.omg.CORBA.SystemException;

import com.sun.org.omg.SendingContext.CodeBase;

import com.sun.corba.se.pept.broker.Broker;
import com.sun.corba.se.pept.encoding.InputObject;
import com.sun.corba.se.pept.encoding.OutputObject;
import com.sun.corba.se.pept.protocol.MessageMediator;
import com.sun.corba.se.pept.transport.Acceptor;
import com.sun.corba.se.pept.transport.Connection;
import com.sun.corba.se.pept.transport.ConnectionCache;
import com.sun.corba.se.pept.transport.ContactInfo;
import com.sun.corba.se.pept.transport.EventHandler;
import com.sun.corba.se.pept.transport.InboundConnectionCache;
import com.sun.corba.se.pept.transport.OutboundConnectionCache;
import com.sun.corba.se.pept.transport.ResponseWaitingRoom;
import com.sun.corba.se.pept.transport.Selector;

import com.sun.corba.se.spi.ior.IOR;
import com.sun.corba.se.spi.ior.iiop.GIOPVersion;
import com.sun.corba.se.spi.logging.CORBALogDomains;
import com.sun.corba.se.spi.orb.ORB;
import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
import com.sun.corba.se.spi.orbutil.threadpool.Work;
import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
import com.sun.corba.se.spi.transport.CorbaContactInfo;
import com.sun.corba.se.spi.transport.CorbaConnection;
import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
import com.sun.corba.se.spi.transport.ReadTimeouts;

import com.sun.corba.se.impl.encoding.CachedCodeBase;
import com.sun.corba.se.impl.encoding.CDRInputStream_1_0;
import com.sun.corba.se.impl.encoding.CDROutputObject;
import com.sun.corba.se.impl.encoding.CDROutputStream_1_0;
import com.sun.corba.se.impl.encoding.CodeSetComponentInfo;
import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry;
import com.sun.corba.se.impl.logging.ORBUtilSystemException;
import com.sun.corba.se.impl.orbutil.ORBConstants;
import com.sun.corba.se.impl.orbutil.ORBUtility;
import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;
import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl;

/**
 * @author Harold Carr
 */
public class SocketOrChannelConnectionImpl
    extends
    EventHandlerBase
    implements
    CorbaConnection,
    Work {

  public static boolean dprintWriteLocks = false;

  //
  // New transport.
  //

  protected long enqueueTime;

  protected SocketChannel socketChannel;

  public SocketChannel getSocketChannel() {
    return socketChannel;
  }

  // REVISIT:
  // protected for test: genericRPCMSGFramework.IIOPConnection constructor.
  protected CorbaContactInfo contactInfo;
  protected Acceptor acceptor;
  protected ConnectionCache connectionCache;

  //
  // From iiop.Connection.java
  //

  protected Socket socket;    // The socket used for this connection.
  protected long timeStamp = 0;
  protected boolean isServer = false;

  // Start at some value other than zero since this is a magic
  // value in some protocols.
  protected int requestId = 5;
  protected CorbaResponseWaitingRoom responseWaitingRoom;
  protected int state;
  protected java.lang.Object stateEvent = new java.lang.Object();
  protected java.lang.Object writeEvent = new java.lang.Object();
  protected boolean writeLocked;
  protected int serverRequestCount = 0;

  // Server request map: used on the server side of Connection
  // Maps request ID to IIOPInputStream.
  Map serverRequestMap = null;

  // This is a flag associated per connection telling us if the
  // initial set of sending contexts were sent to the receiver
  // already...
  protected boolean postInitialContexts = false;

  // Remote reference to CodeBase server (supplies
  // FullValueDescription, among other things)
  protected IOR codeBaseServerIOR;

  // CodeBase cache for this connection.  This will cache remote operations,
  // handle connecting, and ensure we don't do any remote operations until
  // necessary.
  protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this);

  protected ORBUtilSystemException wrapper;

  // transport read timeout values
  protected ReadTimeouts readTimeouts;

  protected boolean shouldReadGiopHeaderOnly;

  // A message mediator used when shouldReadGiopHeaderOnly is
  // true to maintain request message state across execution in a
  // SelectorThread and WorkerThread.
  protected CorbaMessageMediator partialMessageMediator = null;

  // Used in genericRPCMSGFramework test.
  protected SocketOrChannelConnectionImpl(ORB orb) {
    this.orb = orb;
    wrapper = ORBUtilSystemException.get(orb,
        CORBALogDomains.RPC_TRANSPORT);

    setWork(this);
    responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this);
    setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts());
  }

  // Both client and servers.
  protected SocketOrChannelConnectionImpl(ORB orb,
      boolean useSelectThreadToWait,
      boolean useWorkerThread) {
    this(orb);
    setUseSelectThreadToWait(useSelectThreadToWait);
    setUseWorkerThreadForEvent(useWorkerThread);
  }

  // Client constructor.
  public SocketOrChannelConnectionImpl(ORB orb,
      CorbaContactInfo contactInfo,
      boolean useSelectThreadToWait,
      boolean useWorkerThread,
      String socketType,
      String hostname,
      int port) {
    this(orb, useSelectThreadToWait, useWorkerThread);

    this.contactInfo = contactInfo;

    try {
      socket = orb.getORBData().getSocketFactory()
          .createSocket(socketType,
              new InetSocketAddress(hostname, port));
      socketChannel = socket.getChannel();

      if (socketChannel != null) {
        boolean isBlocking = !useSelectThreadToWait;
        socketChannel.configureBlocking(isBlocking);
      } else {
        // IMPORTANT: non-channel-backed sockets must use
        // dedicated reader threads.
        setUseSelectThreadToWait(false);
      }
      if (orb.transportDebugFlag) {
        dprint(".initialize: connection created: " + socket);
      }
    } catch (Throwable t) {
      throw wrapper.connectFailure(t, socketType, hostname,
          Integer.toString(port));
    }
    state = OPENING;
  }

  // Client-side convenience.
  public SocketOrChannelConnectionImpl(ORB orb,
      CorbaContactInfo contactInfo,
      String socketType,
      String hostname,
      int port) {
    this(orb, contactInfo,
        orb.getORBData().connectionSocketUseSelectThreadToWait(),
        orb.getORBData().connectionSocketUseWorkerThreadForEvent(),
        socketType, hostname, port);
  }

  // Server-side constructor.
  public SocketOrChannelConnectionImpl(ORB orb,
      Acceptor acceptor,
      Socket socket,
      boolean useSelectThreadToWait,
      boolean useWorkerThread) {
    this(orb, useSelectThreadToWait, useWorkerThread);

    this.socket = socket;
    socketChannel = socket.getChannel();
    if (socketChannel != null) {
      // REVISIT
      try {
        boolean isBlocking = !useSelectThreadToWait;
        socketChannel.configureBlocking(isBlocking);
      } catch (IOException e) {
        RuntimeException rte = new RuntimeException();
        rte.initCause(e);
        throw rte;
      }
    }
    this.acceptor = acceptor;

    serverRequestMap = Collections.synchronizedMap(new HashMap());
    isServer = true;

    state = ESTABLISHED;
  }

  // Server-side convenience
  public SocketOrChannelConnectionImpl(ORB orb,
      Acceptor acceptor,
      Socket socket) {
    this(orb, acceptor, socket,
        (socket.getChannel() == null
            ? false
            : orb.getORBData().connectionSocketUseSelectThreadToWait()),
        (socket.getChannel() == null
            ? false
            : orb.getORBData().connectionSocketUseWorkerThreadForEvent()));
  }

  ////////////////////////////////////////////////////
  //
  // framework.transport.Connection
  //

  public boolean shouldRegisterReadEvent() {
    return true;
  }

  public boolean shouldRegisterServerReadEvent() {
    return true;
  }

  public boolean read() {
    try {
      if (orb.transportDebugFlag) {
        dprint(".read->: " + this);
      }
      CorbaMessageMediator messageMediator = readBits();
      if (messageMediator != null) {
        // Null can happen when client closes stream
        // causing purgecalls.
        return dispatch(messageMediator);
      }
      return true;
    } finally {
      if (orb.transportDebugFlag) {
        dprint(".read<-: " + this);
      }
    }
  }

  protected CorbaMessageMediator readBits() {
    try {

      if (orb.transportDebugFlag) {
        dprint(".readBits->: " + this);
      }

      MessageMediator messageMediator;
      // REVISIT - use common factory base class.
      if (contactInfo != null) {
        messageMediator =
            contactInfo.createMessageMediator(orb, this);
      } else if (acceptor != null) {
        messageMediator = acceptor.createMessageMediator(orb, this);
      } else {
        throw
            new RuntimeException("SocketOrChannelConnectionImpl.readBits");
      }
      return (CorbaMessageMediator) messageMediator;

    } catch (ThreadDeath td) {
      if (orb.transportDebugFlag) {
        dprint(".readBits: " + this + ": ThreadDeath: " + td, td);
      }
      try {
        purgeCalls(wrapper.connectionAbort(td), false, false);
      } catch (Throwable t) {
        if (orb.transportDebugFlag) {
          dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t);
        }
      }
      throw td;
    } catch (Throwable ex) {
      if (orb.transportDebugFlag) {
        dprint(".readBits: " + this + ": Throwable: " + ex, ex);
      }

      try {
        if (ex instanceof INTERNAL) {
          sendMessageError(GIOPVersion.DEFAULT_VERSION);
        }
      } catch (IOException e) {
        if (orb.transportDebugFlag) {
          dprint(".readBits: " + this +
              ": sendMessageError: IOException: " + e, e);
        }
      }
      // REVISIT - make sure reader thread is killed.
      orb.getTransportManager().getSelector(0).unregisterForEvent(this);
      // Notify anyone waiting.
      purgeCalls(wrapper.connectionAbort(ex), true, false);
      // REVISIT
      //keepRunning = false;
      // REVISIT - if this is called after purgeCalls then
      // the state of the socket is ABORT so the writeLock
      // in close throws an exception.  It is ignored but
      // causes IBM (screen scraping) tests to fail.
      //close();
    } finally {
      if (orb.transportDebugFlag) {
        dprint(".readBits<-: " + this);
      }
    }
    return null;
  }

  protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator) {
    try {

      if (orb.transportDebugFlag) {
        dprint(".finishReadingBits->: " + this);
      }

      // REVISIT - use common factory base class.
      if (contactInfo != null) {
        messageMediator =
            contactInfo.finishCreatingMessageMediator(orb, this, messageMediator);
      } else if (acceptor != null) {
        messageMediator =
            acceptor.finishCreatingMessageMediator(orb, this, messageMediator);
      } else {
        throw
            new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits");
      }
      return (CorbaMessageMediator) messageMediator;

    } catch (ThreadDeath td) {
      if (orb.transportDebugFlag) {
        dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td);
      }
      try {
        purgeCalls(wrapper.connectionAbort(td), false, false);
      } catch (Throwable t) {
        if (orb.transportDebugFlag) {
          dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t);
        }
      }
      throw td;
    } catch (Throwable ex) {
      if (orb.transportDebugFlag) {
        dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex);
      }

      try {
        if (ex instanceof INTERNAL) {
          sendMessageError(GIOPVersion.DEFAULT_VERSION);
        }
      } catch (IOException e) {
        if (orb.transportDebugFlag) {
          dprint(".finishReadingBits: " + this +
              ": sendMessageError: IOException: " + e, e);
        }
      }
      // REVISIT - make sure reader thread is killed.
      orb.getTransportManager().getSelector(0).unregisterForEvent(this);
      // Notify anyone waiting.
      purgeCalls(wrapper.connectionAbort(ex), true, false);
      // REVISIT
      //keepRunning = false;
      // REVISIT - if this is called after purgeCalls then
      // the state of the socket is ABORT so the writeLock
      // in close throws an exception.  It is ignored but
      // causes IBM (screen scraping) tests to fail.
      //close();
    } finally {
      if (orb.transportDebugFlag) {
        dprint(".finishReadingBits<-: " + this);
      }
    }
    return null;
  }

  protected boolean dispatch(CorbaMessageMediator messageMediator) {
    try {
      if (orb.transportDebugFlag) {
        dprint(".dispatch->: " + this);
      }

      //
      // NOTE:
      //
      // This call is the transition from the tranport block
      // to the protocol block.
      //

      boolean result =
          messageMediator.getProtocolHandler()
              .handleRequest(messageMediator);

      return result;

    } catch (ThreadDeath td) {
      if (orb.transportDebugFlag) {
        dprint(".dispatch: ThreadDeath", td);
      }
      try {
        purgeCalls(wrapper.connectionAbort(td), false, false);
      } catch (Throwable t) {
        if (orb.transportDebugFlag) {
          dprint(".dispatch: purgeCalls: Throwable", t);
        }
      }
      throw td;
    } catch (Throwable ex) {
      if (orb.transportDebugFlag) {
        dprint(".dispatch: Throwable", ex);
      }

      try {
        if (ex instanceof INTERNAL) {
          sendMessageError(GIOPVersion.DEFAULT_VERSION);
        }
      } catch (IOException e) {
        if (orb.transportDebugFlag) {
          dprint(".dispatch: sendMessageError: IOException", e);
        }
      }
      purgeCalls(wrapper.connectionAbort(ex), false, false);
      // REVISIT
      //keepRunning = false;
    } finally {
      if (orb.transportDebugFlag) {
        dprint(".dispatch<-: " + this);
      }
    }

    return true;
  }

  public boolean shouldUseDirectByteBuffers() {
    return getSocketChannel() != null;
  }

  public ByteBuffer read(int size, int offset, int length, long max_wait_time)
      throws IOException {
    if (shouldUseDirectByteBuffers()) {

      ByteBuffer byteBuffer =
          orb.getByteBufferPool().getByteBuffer(size);

      if (orb.transportDebugFlag) {
        // print address of ByteBuffer gotten from pool
        int bbAddress = System.identityHashCode(byteBuffer);
        StringBuffer sb = new StringBuffer(80);
        sb.append(".read: got ByteBuffer id (");
        sb.append(bbAddress).append(") from ByteBufferPool.");
        String msgStr = sb.toString();
        dprint(msgStr);
      }

      byteBuffer.position(offset);
      byteBuffer.limit(size);

      readFully(byteBuffer, length, max_wait_time);

      return byteBuffer;
    }

    byte[] buf = new byte[size];
    readFully(getSocket().getInputStream(), buf,
        offset, length, max_wait_time);
    ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
    byteBuffer.limit(size);
    return byteBuffer;
  }

  public ByteBuffer read(ByteBuffer byteBuffer, int offset,
      int length, long max_wait_time)
      throws IOException {
    int size = offset + length;
    if (shouldUseDirectByteBuffers()) {

      if (!byteBuffer.isDirect()) {
        throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
      }
      if (size > byteBuffer.capacity()) {
        if (orb.transportDebugFlag) {
          // print address of ByteBuffer being released
          int bbAddress = System.identityHashCode(byteBuffer);
          StringBuffer bbsb = new StringBuffer(80);
          bbsb.append(".read: releasing ByteBuffer id (")
              .append(bbAddress).append(") to ByteBufferPool.");
          String bbmsg = bbsb.toString();
          dprint(bbmsg);
        }
        orb.getByteBufferPool().releaseByteBuffer(byteBuffer);
        byteBuffer = orb.getByteBufferPool().getByteBuffer(size);
      }
      byteBuffer.position(offset);
      byteBuffer.limit(size);
      readFully(byteBuffer, length, max_wait_time);
      byteBuffer.position(0);
      byteBuffer.limit(size);
      return byteBuffer;
    }
    if (byteBuffer.isDirect()) {
      throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
    }
    byte[] buf = new byte[size];
    readFully(getSocket().getInputStream(), buf,
        offset, length, max_wait_time);
    return ByteBuffer.wrap(buf);
  }

  public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time)
      throws IOException {
    int n = 0;
    int bytecount = 0;
    long time_to_wait = readTimeouts.get_initial_time_to_wait();
    long total_time_in_wait = 0;

    // The reading of data incorporates a strategy to detect a
    // rogue client. The strategy is implemented as follows. As
    // long as data is being read, at least 1 byte or more, we
    // assume we have a well behaved client. If no data is read,
    // then we sleep for a time to wait, re-calculate a new time to
    // wait which is lengthier than the previous time spent waiting.
    // Then, if the total time spent waiting does not exceed a
    // maximum time we are willing to wait, we attempt another
    // read. If the maximum amount of time we are willing to
    // spend waiting for more data is exceeded, we throw an
    // IOException.

    // NOTE: Reading of GIOP headers are treated with a smaller
    //       maximum time to wait threshold. Based on extensive
    //       performance testing, all GIOP headers are being
    //       read in 1 read access.

    do {
      bytecount = getSocketChannel().read(byteBuffer);

      if (bytecount < 0) {
        throw new IOException("End-of-stream");
      } else if (bytecount == 0) {
        try {
          Thread.sleep(time_to_wait);
          total_time_in_wait += time_to_wait;
          time_to_wait =
              (long) (time_to_wait * readTimeouts.get_backoff_factor());
        } catch (InterruptedException ie) {
          // ignore exception
          if (orb.transportDebugFlag) {
            dprint("readFully(): unexpected exception "
                + ie.toString());
          }
        }
      } else {
        n += bytecount;
      }
    }
    while (n < size && total_time_in_wait < max_wait_time);

    if (n < size && total_time_in_wait >= max_wait_time) {
      // failed to read entire message
      throw wrapper.transportReadTimeoutExceeded(new Integer(size),
          new Integer(n), new Long(max_wait_time),
          new Long(total_time_in_wait));
    }

    getConnectionCache().stampTime(this);
  }

  // To support non-channel connections.
  public void readFully(java.io.InputStream is, byte[] buf,
      int offset, int size, long max_wait_time)
      throws IOException {
    int n = 0;
    int bytecount = 0;
    long time_to_wait = readTimeouts.get_initial_time_to_wait();
    long total_time_in_wait = 0;

    // The reading of data incorporates a strategy to detect a
    // rogue client. The strategy is implemented as follows. As
    // long as data is being read, at least 1 byte or more, we
    // assume we have a well behaved client. If no data is read,
    // then we sleep for a time to wait, re-calculate a new time to
    // wait which is lengthier than the previous time spent waiting.
    // Then, if the total time spent waiting does not exceed a
    // maximum time we are willing to wait, we attempt another
    // read. If the maximum amount of time we are willing to
    // spend waiting for more data is exceeded, we throw an
    // IOException.

    // NOTE: Reading of GIOP headers are treated with a smaller
    //       maximum time to wait threshold. Based on extensive
    //       performance testing, all GIOP headers are being
    //       read in 1 read access.

    do {
      bytecount = is.read(buf, offset + n, size - n);
      if (bytecount < 0) {
        throw new IOException("End-of-stream");
      } else if (bytecount == 0) {
        try {
          Thread.sleep(time_to_wait);
          total_time_in_wait += time_to_wait;
          time_to_wait =
              (long) (time_to_wait * readTimeouts.get_backoff_factor());
        } catch (InterruptedException ie) {
          // ignore exception
          if (orb.transportDebugFlag) {
            dprint("readFully(): unexpected exception "
                + ie.toString());
          }
        }
      } else {
        n += bytecount;
      }
    }
    while (n < size && total_time_in_wait < max_wait_time);

    if (n < size && total_time_in_wait >= max_wait_time) {
      // failed to read entire message
      throw wrapper.transportReadTimeoutExceeded(new Integer(size),
          new Integer(n), new Long(max_wait_time),
          new Long(total_time_in_wait));
    }

    getConnectionCache().stampTime(this);
  }

  public void write(ByteBuffer byteBuffer)
      throws IOException {
    if (shouldUseDirectByteBuffers()) {
            /* NOTE: cannot perform this test.  If one ask for a
               ByteBuffer from the pool which is bigger than the size
               of ByteBuffers managed by the pool, then the pool will
               return a HeapByteBuffer.
            if (byteBuffer.hasArray()) {
                throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
            }
            */
      // IMPORTANT: For non-blocking SocketChannels, there's no guarantee
      //            all bytes are written on first write attempt.
      do {
        getSocketChannel().write(byteBuffer);
      }
      while (byteBuffer.hasRemaining());

    } else {
      if (!byteBuffer.hasArray()) {
        throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
      }
      byte[] tmpBuf = byteBuffer.array();
      getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit());
      getSocket().getOutputStream().flush();
    }

    // TimeStamp connection to indicate it has been used
    // Note granularity of connection usage is assumed for
    // now to be that of a IIOP packet.
    getConnectionCache().stampTime(this);
  }

  /**
   * Note:it is possible for this to be called more than once
   */
  public synchronized void close() {
    try {
      if (orb.transportDebugFlag) {
        dprint(".close->: " + this);
      }
      writeLock();

      // REVISIT It will be good to have a read lock on the reader thread
      // before we proceed further, to avoid the reader thread (server side)
      // from processing requests. This avoids the risk that a new request
      // will be accepted by ReaderThread while the ListenerThread is
      // attempting to close this connection.

      if (isBusy()) { // we are busy!
        writeUnlock();
        if (orb.transportDebugFlag) {
          dprint(".close: isBusy so no close: " + this);
        }
        return;
      }

      try {
        try {
          sendCloseConnection(GIOPVersion.V1_0);
        } catch (Throwable t) {
          wrapper.exceptionWhenSendingCloseConnection(t);
        }

        synchronized (stateEvent) {
          state = CLOSE_SENT;
          stateEvent.notifyAll();
        }

        // stop the reader without causing it to do purgeCalls
        //Exception ex = new Exception();
        //reader.stop(ex); // REVISIT

        // NOTE: !!!!!!
        // This does writeUnlock().
        purgeCalls(wrapper.connectionRebind(), false, true);

      } catch (Exception ex) {
        if (orb.transportDebugFlag) {
          dprint(".close: exception: " + this, ex);
        }
      }
      try {
        Selector selector = orb.getTransportManager().getSelector(0);
        selector.unregisterForEvent(this);
        if (socketChannel != null) {
          socketChannel.close();
        }
        socket.close();
      } catch (IOException e) {
        if (orb.transportDebugFlag) {
          dprint(".close: " + this, e);
        }
      }
      closeConnectionResources();
    } finally {
      if (orb.transportDebugFlag) {
        dprint(".close<-: " + this);
      }
    }
  }

  public void closeConnectionResources() {
    if (orb.transportDebugFlag) {
      dprint(".closeConnectionResources->: " + this);
    }
    Selector selector = orb.getTransportManager().getSelector(0);
    selector.unregisterForEvent(this);
    try {
      if (socketChannel != null) {
        socketChannel.close();
      }
      if (socket != null && !socket.isClosed()) {
        socket.close();
      }
    } catch (IOException e) {
      if (orb.transportDebugFlag) {
        dprint(".closeConnectionResources: " + this, e);
      }
    }
    if (orb.transportDebugFlag) {
      dprint(".closeConnectionResources<-: " + this);
    }
  }


  public Acceptor getAcceptor() {
    return acceptor;
  }

  public ContactInfo getContactInfo() {
    return contactInfo;
  }

  public EventHandler getEventHandler() {
    return this;
  }

  public OutputObject createOutputObject(MessageMediator messageMediator) {
    // REVISIT - remove this method from Connection and all it subclasses.
    throw new RuntimeException(
        "*****SocketOrChannelConnectionImpl.createOutputObject - should not be called.");
  }

  // This is used by the GIOPOutputObject in order to
  // throw the correct error when handling code sets.
  // Can we determine if we are on the server side by
  // other means?  XREVISIT
  public boolean isServer() {
    return isServer;
  }

  public boolean isBusy() {
    if (serverRequestCount > 0 ||
        getResponseWaitingRoom().numberRegistered() > 0) {
      return true;
    } else {
      return false;
    }
  }

  public long getTimeStamp() {
    return timeStamp;
  }

  public void setTimeStamp(long time) {
    timeStamp = time;
  }

  public void setState(String stateString) {
    synchronized (stateEvent) {
      if (stateString.equals("ESTABLISHED")) {
        state = ESTABLISHED;
        stateEvent.notifyAll();
      } else {
        // REVISIT: ASSERT
      }
    }
  }

  /**
   * Sets the writeLock for this connection.
   * If the writeLock is already set by someone else, block till the
   * writeLock is released and can set by us.
   * IMPORTANT: this connection's lock must be acquired before
   * setting the writeLock and must be unlocked after setting the writeLock.
   */
  public void writeLock() {
    try {
      if (dprintWriteLocks && orb.transportDebugFlag) {
        dprint(".writeLock->: " + this);
      }
      // Keep looping till we can set the writeLock.
      while (true) {
        int localState = state;
        switch (localState) {

          case OPENING:
            synchronized (stateEvent) {
              if (state != OPENING) {
                // somebody has changed 'state' so be careful
                break;
              }
              try {
                stateEvent.wait();
              } catch (InterruptedException ie) {
                if (orb.transportDebugFlag) {
                  dprint(".writeLock: OPENING InterruptedException: " + this);
                }
              }
            }
            // Loop back
            break;

          case ESTABLISHED:
            synchronized (writeEvent) {
              if (!writeLocked) {
                writeLocked = true;
                return;
              }

              try {
                // do not stay here too long if state != ESTABLISHED
                // Bug 4752117
                while (state == ESTABLISHED && writeLocked) {
                  writeEvent.wait(100);
                }
              } catch (InterruptedException ie) {
                if (orb.transportDebugFlag) {
                  dprint(".writeLock: ESTABLISHED InterruptedException: " + this);
                }
              }
            }
            // Loop back
            break;

          //
          // XXX
          // Need to distinguish between client and server roles
          // here probably.
          //
          case ABORT:
            synchronized (stateEvent) {
              if (state != ABORT) {
                break;
              }
              throw wrapper.writeErrorSend();
            }

          case CLOSE_RECVD:
            // the connection has been closed or closing
            // ==> throw rebind exception
            synchronized (stateEvent) {
              if (state != CLOSE_RECVD) {
                break;
              }
              throw wrapper.connectionCloseRebind();
            }

          default:
            if (orb.transportDebugFlag) {
              dprint(".writeLock: default: " + this);
            }
            // REVISIT
            throw new RuntimeException(".writeLock: bad state");
        }
      }
    } finally {
      if (dprintWriteLocks && orb.transportDebugFlag) {
        dprint(".writeLock<-: " + this);
      }
    }
  }

  public void writeUnlock() {
    try {
      if (dprintWriteLocks && orb.transportDebugFlag) {
        dprint(".writeUnlock->: " + this);
      }
      synchronized (writeEvent) {
        writeLocked = false;
        writeEvent.notify(); // wake up one guy waiting to write
      }
    } finally {
      if (dprintWriteLocks && orb.transportDebugFlag) {
        dprint(".writeUnlock<-: " + this);
      }
    }
  }

  // Assumes the caller handles writeLock and writeUnlock
  public void sendWithoutLock(OutputObject outputObject) {
    // Don't we need to check for CloseConnection
    // here?  REVISIT

    // XREVISIT - Shouldn't the MessageMediator
    // be the one to handle writing the data here?

    try {

      // Write the fragment/message

      CDROutputObject cdrOutputObject = (CDROutputObject) outputObject;
      cdrOutputObject.writeTo(this);
      // REVISIT - no flush?
      //socket.getOutputStream().flush();

    } catch (IOException e1) {

            /*
             * ADDED(Ram J) 10/13/2000 In the event of an IOException, try
             * sending a CancelRequest for regular requests / locate requests
             */

      // Since IIOPOutputStream's msgheader is set only once, and not
      // altered during sending multiple fragments, the original
      // msgheader will always have the requestId.
      // REVISIT This could be optimized to send a CancelRequest only
      // if any fragments had been sent already.

            /* REVISIT: MOVE TO SUBCONTRACT
            Message msg = os.getMessage();
            if (msg.getType() == Message.GIOPRequest ||
                    msg.getType() == Message.GIOPLocateRequest) {
                GIOPVersion requestVersion = msg.getGIOPVersion();
                int requestId = MessageBase.getRequestId(msg);
                try {
                    sendCancelRequest(requestVersion, requestId);
                } catch (IOException e2) {
                    // most likely an abortive connection closure.
                    // ignore, since nothing more can be done.
                    if (orb.transportDebugFlag) {

                }
            }
            */

      // REVISIT When a send failure happens, purgeCalls() need to be
      // called to ensure that the connection is properly removed from
      // further usage (ie., cancelling pending requests with COMM_FAILURE
      // with an appropriate minor_code CompletionStatus.MAY_BE).

      // Relying on the IIOPOutputStream (as noted below) is not
      // sufficient as it handles COMM_FAILURE only for the final
      // fragment (during invoke processing). Note that COMM_FAILURE could
      // happen while sending the initial fragments.
      // Also the IIOPOutputStream does not properly close the connection.
      // It simply removes the connection from the table. An orderly
      // closure is needed (ie., cancel pending requests on the connection
      // COMM_FAILURE as well.

      // IIOPOutputStream will cleanup the connection info when it
      // sees this exception.
      SystemException exc = wrapper.writeErrorSend(e1);
      purgeCalls(exc, false, true);
      throw exc;
    }
  }

  public void registerWaiter(MessageMediator messageMediator) {
    responseWaitingRoom.registerWaiter(messageMediator);
  }

  public void unregisterWaiter(MessageMediator messageMediator) {
    responseWaitingRoom.unregisterWaiter(messageMediator);
  }

  public InputObject waitForResponse(MessageMediator messageMediator) {
    return responseWaitingRoom.waitForResponse(messageMediator);
  }

  public void setConnectionCache(ConnectionCache connectionCache) {
    this.connectionCache = connectionCache;
  }

  public ConnectionCache getConnectionCache() {
    return connectionCache;
  }

  ////////////////////////////////////////////////////
  //
  // EventHandler methods
  //

  public void setUseSelectThreadToWait(boolean x) {
    useSelectThreadToWait = x;
    // REVISIT - Reading of a GIOP header only is information
    //           that should be passed into the constructor
    //           from the SocketOrChannelConnection factory.
    setReadGiopHeaderOnly(shouldUseSelectThreadToWait());
  }

  public void handleEvent() {
    if (orb.transportDebugFlag) {
      dprint(".handleEvent->: " + this);
    }
    getSelectionKey().interestOps(getSelectionKey().interestOps() &
        (~getInterestOps()));

    if (shouldUseWorkerThreadForEvent()) {
      Throwable throwable = null;
      try {
        int poolToUse = 0;
        if (shouldReadGiopHeaderOnly()) {
          partialMessageMediator = readBits();
          poolToUse =
              partialMessageMediator.getThreadPoolToUse();
        }

        if (orb.transportDebugFlag) {
          dprint(".handleEvent: addWork to pool: " + poolToUse);
        }
        orb.getThreadPoolManager().getThreadPool(poolToUse)
            .getWorkQueue(0).addWork(getWork());
      } catch (NoSuchThreadPoolException e) {
        throwable = e;
      } catch (NoSuchWorkQueueException e) {
        throwable = e;
      }
      // REVISIT: need to close connection.
      if (throwable != null) {
        if (orb.transportDebugFlag) {
          dprint(".handleEvent: " + throwable);
        }
        INTERNAL i = new INTERNAL("NoSuchThreadPoolException");
        i.initCause(throwable);
        throw i;
      }
    } else {
      if (orb.transportDebugFlag) {
        dprint(".handleEvent: doWork");
      }
      getWork().doWork();
    }
    if (orb.transportDebugFlag) {
      dprint(".handleEvent<-: " + this);
    }
  }

  public SelectableChannel getChannel() {
    return socketChannel;
  }

  public int getInterestOps() {
    return SelectionKey.OP_READ;
  }

  //    public Acceptor getAcceptor() - already defined above.

  public Connection getConnection() {
    return this;
  }

  ////////////////////////////////////////////////////
  //
  // Work methods.
  //

  public String getName() {
    return this.toString();
  }

  public void doWork() {
    try {
      if (orb.transportDebugFlag) {
        dprint(".doWork->: " + this);
      }

      // IMPORTANT: Sanity checks on SelectionKeys such as
      //            SelectorKey.isValid() should not be done
      //            here.
      //

      if (!shouldReadGiopHeaderOnly()) {
        read();
      } else {
        // get the partialMessageMediator
        // created by SelectorThread
        CorbaMessageMediator messageMediator =
            this.getPartialMessageMediator();

        // read remaining info needed in a MessageMediator
        messageMediator = finishReadingBits(messageMediator);

        if (messageMediator != null) {
          // Null can happen when client closes stream
          // causing purgecalls.
          dispatch(messageMediator);
        }
      }
    } catch (Throwable t) {
      if (orb.transportDebugFlag) {
        dprint(".doWork: ignoring Throwable: "
            + t
            + " " + this);
      }
    } finally {
      if (orb.transportDebugFlag) {
        dprint(".doWork<-: " + this);
      }
    }
  }

  public void setEnqueueTime(long timeInMillis) {
    enqueueTime = timeInMillis;
  }

  public long getEnqueueTime() {
    return enqueueTime;
  }

  ////////////////////////////////////////////////////
  //
  // spi.transport.CorbaConnection.
  //

  // IMPORTANT: Reader Threads must NOT read Giop header only.
  public boolean shouldReadGiopHeaderOnly() {
    return shouldReadGiopHeaderOnly;
  }

  protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) {
    shouldReadGiopHeaderOnly = shouldReadHeaderOnly;
  }

  public ResponseWaitingRoom getResponseWaitingRoom() {
    return responseWaitingRoom;
  }

  // REVISIT - inteface defines isServer but already defined in
  // higher interface.

  public void serverRequestMapPut(int requestId,
      CorbaMessageMediator messageMediator) {
    serverRequestMap.put(new Integer(requestId), messageMediator);
  }

  public CorbaMessageMediator serverRequestMapGet(int requestId) {
    return (CorbaMessageMediator)
        serverRequestMap.get(new Integer(requestId));
  }

  public void serverRequestMapRemove(int requestId) {
    serverRequestMap.remove(new Integer(requestId));
  }


  // REVISIT: this is also defined in:
  // com.sun.corba.se.spi.legacy.connection.Connection
  public java.net.Socket getSocket() {
    return socket;
  }

  /**
   * It is possible for a Close Connection to have been
   * * sent here, but we will not check for this. A "lazy"
   * * Exception will be thrown in the Worker thread after the
   * * incoming request has been processed even though the connection
   * * is closed before the request is processed. This is o.k because
   * * it is a boundary condition. To prevent it we would have to add
   * * more locks which would reduce performance in the normal case.
   **/
  public synchronized void serverRequestProcessingBegins() {
    serverRequestCount++;
  }

  public synchronized void serverRequestProcessingEnds() {
    serverRequestCount--;
  }

  //
  //
  //

  public synchronized int getNextRequestId() {
    return requestId++;
  }

  // Negotiated code sets for char and wchar data
  protected CodeSetComponentInfo.CodeSetContext codeSetContext = null;

  public ORB getBroker() {
    return orb;
  }

  public CodeSetComponentInfo.CodeSetContext getCodeSetContext() {
    // Needs to be synchronized for the following case when the client
    // doesn't send the code set context twice, and we have two threads
    // in ServerRequestDispatcher processCodeSetContext.
    //
    // Thread A checks to see if there is a context, there is none, so
    //     it calls setCodeSetContext, getting the synch lock.
    // Thread B checks to see if there is a context.  If we didn't synch,
    //     it might decide to outlaw wchar/wstring.
    if (codeSetContext == null) {
      synchronized (this) {
        return codeSetContext;
      }
    }

    return codeSetContext;
  }

  public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) {
    // Double check whether or not we need to do this
    if (codeSetContext == null) {

      if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null ||
          OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) {
        // If the client says it's negotiated a code set that
        // isn't a fallback and we never said we support, then
        // it has a bug.
        throw wrapper.badCodesetsFromClient();
      }

      codeSetContext = csc;
    }
  }

  //
  // from iiop.IIOPConnection.java
  //

  // Map request ID to an InputObject.
  // This is so the client thread can start unmarshaling
  // the reply and remove it from the out_calls map while the
  // ReaderThread can still obtain the input stream to give
  // new fragments.  Only the ReaderThread touches the clientReplyMap,
  // so it doesn't incur synchronization overhead.

  public MessageMediator clientRequestMapGet(int requestId) {
    return responseWaitingRoom.getMessageMediator(requestId);
  }

  protected MessageMediator clientReply_1_1;

  public void clientReply_1_1_Put(MessageMediator x) {
    clientReply_1_1 = x;
  }

  public MessageMediator clientReply_1_1_Get() {
    return clientReply_1_1;
  }

  public void clientReply_1_1_Remove() {
    clientReply_1_1 = null;
  }

  protected MessageMediator serverRequest_1_1;

  public void serverRequest_1_1_Put(MessageMediator x) {
    serverRequest_1_1 = x;
  }

  public MessageMediator serverRequest_1_1_Get() {
    return serverRequest_1_1;
  }

  public void serverRequest_1_1_Remove() {
    serverRequest_1_1 = null;
  }

  protected String getStateString(int state) {
    synchronized (stateEvent) {
      switch (state) {
        case OPENING:
          return "OPENING";
        case ESTABLISHED:
          return "ESTABLISHED";
        case CLOSE_SENT:
          return "CLOSE_SENT";
        case CLOSE_RECVD:
          return "CLOSE_RECVD";
        case ABORT:
          return "ABORT";
        default:
          return "???";
      }
    }
  }

  public synchronized boolean isPostInitialContexts() {
    return postInitialContexts;
  }

  // Can never be unset...
  public synchronized void setPostInitialContexts() {
    postInitialContexts = true;
  }

  /**
   * Wake up the outstanding requests on the connection, and hand them
   * COMM_FAILURE exception with a given minor code.
   *
   * Also, delete connection from connection table and
   * stop the reader thread.
   *
   * Note that this should only ever be called by the Reader thread for
   * this connection.
   *
   * @param minor_code The minor code for the COMM_FAILURE major code.
   * @param die Kill the reader thread (this thread) before exiting.
   */
  public void purgeCalls(SystemException systemException,
      boolean die, boolean lockHeld) {
    int minor_code = systemException.minor;

    try {
      if (orb.transportDebugFlag) {
        dprint(".purgeCalls->: "
            + minor_code + "/" + die + "/" + lockHeld
            + " " + this);
      }

      // If this invocation is a result of ThreadDeath caused
      // by a previous execution of this routine, just exit.

      synchronized (stateEvent) {
        if ((state == ABORT) || (state == CLOSE_RECVD)) {
          if (orb.transportDebugFlag) {
            dprint(".purgeCalls: exiting since state is: "
                + getStateString(state)
                + " " + this);
          }
          return;
        }
      }

      // Grab the writeLock (freeze the calls)
      try {
        if (!lockHeld) {
          writeLock();
        }
      } catch (SystemException ex) {
        if (orb.transportDebugFlag) {
          dprint(".purgeCalls: SystemException" + ex
              + "; continuing " + this);
        }
      }

      // Mark the state of the connection
      // and determine the request status
      org.omg.CORBA.CompletionStatus completion_status;
      synchronized (stateEvent) {
        if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) {
          state = CLOSE_RECVD;
          systemException.completed = CompletionStatus.COMPLETED_NO;
        } else {
          state = ABORT;
          systemException.completed = CompletionStatus.COMPLETED_MAYBE;
        }
        stateEvent.notifyAll();
      }

      try {
        socket.getInputStream().close();
        socket.getOutputStream().close();
        socket.close();
      } catch (Exception ex) {
        if (orb.transportDebugFlag) {
          dprint(".purgeCalls: Exception closing socket: " + ex
              + " " + this);
        }
      }

      // Signal all threads with outstanding requests on this
      // connection and give them the SystemException;

      responseWaitingRoom.signalExceptionToAllWaiters(systemException);
    } finally {
      if (contactInfo != null) {
        ((OutboundConnectionCache) getConnectionCache()).remove(contactInfo);
      } else if (acceptor != null) {
        ((InboundConnectionCache) getConnectionCache()).remove(this);
      }

      //
      // REVISIT: Stop the reader thread
      //

      // Signal all the waiters of the writeLock.
      // There are 4 types of writeLock waiters:
      // 1. Send waiters:
      // 2. SendReply waiters:
      // 3. cleanUp waiters:
      // 4. purge_call waiters:
      //

      writeUnlock();

      if (orb.transportDebugFlag) {
        dprint(".purgeCalls<-: "
            + minor_code + "/" + die + "/" + lockHeld
            + " " + this);
      }
    }
  }

  /*************************************************************************
   * The following methods are for dealing with Connection cleaning for
   * better scalability of servers in high network load conditions.
   **************************************************************************/

  public void sendCloseConnection(GIOPVersion giopVersion)
      throws IOException {
    Message msg = MessageBase.createCloseConnection(giopVersion);
    sendHelper(giopVersion, msg);
  }

  public void sendMessageError(GIOPVersion giopVersion)
      throws IOException {
    Message msg = MessageBase.createMessageError(giopVersion);
    sendHelper(giopVersion, msg);
  }

  /**
   * Send a CancelRequest message. This does not lock the connection, so the
   * caller needs to ensure this method is called appropriately.
   *
   * @throws IOException - could be due to abortive connection closure.
   */
  public void sendCancelRequest(GIOPVersion giopVersion, int requestId)
      throws IOException {

    Message msg = MessageBase.createCancelRequest(giopVersion, requestId);
    sendHelper(giopVersion, msg);
  }

  protected void sendHelper(GIOPVersion giopVersion, Message msg)
      throws IOException {
    // REVISIT: See comments in CDROutputObject constructor.
    CDROutputObject outputObject =
        sun.corba.OutputStreamFactory.newCDROutputObject((ORB) orb, null, giopVersion,
            this, msg, ORBConstants.STREAM_FORMAT_VERSION_1);
    msg.write(outputObject);

    outputObject.writeTo(this);
  }

  public void sendCancelRequestWithLock(GIOPVersion giopVersion,
      int requestId)
      throws IOException {
    writeLock();
    try {
      sendCancelRequest(giopVersion, requestId);
    } finally {
      writeUnlock();
    }
  }

  // Begin Code Base methods ---------------------------------------
  //
  // Set this connection's code base IOR.  The IOR comes from the
  // SendingContext.  This is an optional service context, but all
  // JavaSoft ORBs send it.
  //
  // The set and get methods don't need to be synchronized since the
  // first possible get would occur during reading a valuetype, and
  // that would be after the set.

  // Sets this connection's code base IOR.  This is done after
  // getting the IOR out of the SendingContext service context.
  // Our ORBs always send this, but it's optional in CORBA.

  public final void setCodeBaseIOR(IOR ior) {
    codeBaseServerIOR = ior;
  }

  public final IOR getCodeBaseIOR() {
    return codeBaseServerIOR;
  }

  // Get a CodeBase stub to use in unmarshaling.  The CachedCodeBase
  // won't connect to the remote codebase unless it's necessary.
  public final CodeBase getCodeBase() {
    return cachedCodeBase;
  }

  // End Code Base methods -----------------------------------------

  // set transport read thresholds
  protected void setReadTimeouts(ReadTimeouts readTimeouts) {
    this.readTimeouts = readTimeouts;
  }

  protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) {
    partialMessageMediator = messageMediator;
  }

  protected CorbaMessageMediator getPartialMessageMediator() {
    return partialMessageMediator;
  }

  public String toString() {
    synchronized (stateEvent) {
      return
          "SocketOrChannelConnectionImpl[" + " "
              + (socketChannel == null ?
              socket.toString() : socketChannel.toString()) + " "
              + getStateString(state) + " "
              + shouldUseSelectThreadToWait() + " "
              + shouldUseWorkerThreadForEvent() + " "
              + shouldReadGiopHeaderOnly()
              + "]";
    }
  }

  // Must be public - used in encoding.
  public void dprint(String msg) {
    ORBUtility.dprint("SocketOrChannelConnectionImpl", msg);
  }

  protected void dprint(String msg, Throwable t) {
    dprint(msg);
    t.printStackTrace(System.out);
  }
}

// End of file.
